package delayqueue

import (
	"errors"
	"fmt"
	"log"
	"math"
	"strconv"
	"sync"
	"time"

	"gitee.com/k8sio/cnkit/unique"
	"github.com/panjf2000/ants/v2"
)

// DelayQueue is a message queue supporting delayed/scheduled delivery based on redis
type DelayQueue struct {
	// name for this Queue. Make sure the name is unique in redis database
	name          string
	redisCli      RedisCli
	cb            func(string) bool
	testKey       string // test key
	pendingKey    string // sorted set: message id -> delivery time
	readyKey      string // list
	unAckKey      string // sorted set: message id -> retry time
	retryKey      string // list
	retryCountKey string // hash: message id -> remain retry count
	garbageKey    string // set: message id
	useHashTag    bool
	ticker        *time.Ticker
	logger        Logger
	close         chan struct{}

	maxConsumeDuration      time.Duration // default 5 seconds
	msgTTL                  time.Duration // default 1 hour
	defaultRetryCount       uint          // default 3
	fetchInterval           time.Duration // default 1 second
	fetchLimit              uint          // default 100 limit
	concurrent              uint          // default 1, executed serially
	allowZAddMultipleParams bool          // default false, true: allow ZADD use multiple params, false: use single param

	eventListener EventListener
	quit          int
}

// NilErr represents redis nil
var NilErr = errors.New("nil")

// RedisCli is abstraction for redis client, required commands only not all commands
type RedisCli interface {
	// Cmder is abstraction for redis commands
	Cmder
	// Pipelined
	Pipelined(fn func(Cmder) error) error
	// Publish used for monitor only
	Publish(channel string, payload string) error
	// Subscribe used for monitor only
	// returns: payload channel, subscription closer, error; the subscription closer should close payload channel as well
	Subscribe(channel string) (payloads <-chan string, close func(), err error)
}

type Cmder interface {
	// Eval sends lua script to redis
	// args should be string, integer or float
	// returns string, int64, []interface{} (elements can be string or int64)
	Eval(script string, keys []string, args []interface{}) (interface{}, error)
	Set(key string, value string, expiration time.Duration) error
	// Get represents redis command GET
	// please NilErr when no such key in redis
	Get(key string) (string, error)
	Del(keys []string) error
	HSet(key string, values ...interface{}) error
	HDel(key string, fields []string) error
	SMembers(key string) ([]string, error)
	SRem(key string, members []string) error
	ZAdd(key string, values map[string]float64) error
	ZRem(key string, fields []string) error
	ZCard(key string) (int64, error)
	LLen(key string) (int64, error)
}

type hashTagKeyOpt int

// CallbackFunc receives and consumes messages
// returns true to confirm successfully consumed, false to re-deliver this message
type CallbackFunc = func(string) bool

// UseHashTagKey add hashtags to redis keys to ensure all keys of this queue are allocated in the same hash slot.
// If you are using Codis/AliyunRedisCluster/TencentCloudRedisCluster, add this option to NewQueue
// WARNING! Changing (add or remove) this option will cause DelayQueue failing to read existed data in redis
// see more:  https://redis.io/docs/reference/cluster-spec/#hash-tags
func UseHashTagKey() interface{} {
	return hashTagKeyOpt(1)
}

// NewQueue0 creates a new queue, use DelayQueue.StartConsume to consume or DelayQueue.SendScheduleMsg to publish message
// callback returns true to confirm successful consumption. If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message
func NewQueue0(name string, cli RedisCli, opts ...interface{}) *DelayQueue {
	if name == "" {
		panic("name is required")
	}
	if cli == nil {
		panic("cli is required")
	}
	useHashTag := false
	var callback CallbackFunc = nil
	for _, opt := range opts {
		switch o := opt.(type) {
		case hashTagKeyOpt:
			useHashTag = true
		case CallbackFunc:
			callback = o
		}
	}
	var keyPrefix string
	if useHashTag {
		keyPrefix = "{" + name + "}"
	} else {
		keyPrefix = "dp:" + name
	}
	return &DelayQueue{
		name:               name,
		redisCli:           cli,
		cb:                 callback,
		testKey:            keyPrefix + ":test",
		pendingKey:         keyPrefix + ":pending",
		readyKey:           keyPrefix + ":ready",
		unAckKey:           keyPrefix + ":unack",
		retryKey:           keyPrefix + ":retry",
		retryCountKey:      keyPrefix + ":cnt",
		garbageKey:         keyPrefix + ":garbage",
		useHashTag:         useHashTag,
		close:              nil,
		maxConsumeDuration: 5 * time.Second,
		msgTTL:             time.Hour,
		logger:             &DefaultLogger{},
		defaultRetryCount:  3,
		fetchInterval:      time.Second,
		fetchLimit:         100,
		concurrent:         1,
	}
}

// WithCallback set callback for queue to receives and consumes messages
// callback returns true to confirm successfully consumed, false to re-deliver this message
func (q *DelayQueue) WithCallback(callback CallbackFunc) *DelayQueue {
	q.cb = callback
	return q
}

// WithLogger customizes logger for queue
func (q *DelayQueue) WithLogger(logger Logger) *DelayQueue {
	q.logger = logger
	return q
}

// WithFetchInterval customizes the interval at which consumer fetch message from redis
func (q *DelayQueue) WithFetchInterval(d time.Duration) *DelayQueue {
	q.fetchInterval = d
	return q
}

// WithMaxConsumeDuration customizes max consume duration
// If no acknowledge received within WithMaxConsumeDuration after message delivery, DelayQueue will try to deliver this message again
func (q *DelayQueue) WithMaxConsumeDuration(d time.Duration) *DelayQueue {
	q.maxConsumeDuration = d
	return q
}

// WithFetchLimit limits the max number of processing messages, must be greater than 0
func (q *DelayQueue) WithFetchLimit(limit uint) *DelayQueue {
	q.fetchLimit = limit
	return q
}

// WithConcurrent sets the number of concurrent consumers
func (q *DelayQueue) WithConcurrent(c uint) *DelayQueue {
	if c == 0 {
		return q
	}
	q.concurrent = c
	return q
}

// WithDefaultRetryCount customizes the max number of retry, it effects of messages in this queue
// use WithRetryCount during DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg to specific retry count of particular message
func (q *DelayQueue) WithDefaultRetryCount(count uint) *DelayQueue {
	q.defaultRetryCount = count
	return q
}

func (q *DelayQueue) genMsgKey(idStr string) string {
	return "dp:" + q.name + ":msg:" + idStr
}

type retryCountOpt int

// WithRetryCount set retry count for a msg
// example: queue.SendDelayMsg(payload, duration, delayqueue.WithRetryCount(3))
func WithRetryCount(count int) interface{} {
	return retryCountOpt(count)
}

type msgTTLOpt time.Duration

// WithMsgTTL set ttl for a msg
// example: queue.SendDelayMsg(payload, duration, delayqueue.WithMsgTTL(Hour))
func WithMsgTTL(d time.Duration) interface{} {
	return msgTTLOpt(d)
}

// SendScheduleMsg submits a message delivered at given time
func (q *DelayQueue) SendScheduleMsg(payload string, t time.Time, opts ...interface{}) error {
	// parse options
	retryCount := q.defaultRetryCount
	for _, opt := range opts {
		switch o := opt.(type) {
		case retryCountOpt:
			retryCount = uint(o)
		case msgTTLOpt:
			q.msgTTL = time.Duration(o)
		}
	}
	err := q.redisCli.Pipelined(func(cmd Cmder) error {
		// generate id
		idStr := unique.GetObjectIdHex()
		now := time.Now()
		// store msg
		msgTTL := t.Sub(now) + q.msgTTL // delivery + q.msgTTL
		q.redisCli.Set(q.genMsgKey(idStr), payload, msgTTL)
		// store retry count
		q.redisCli.HSet(q.retryCountKey, idStr, strconv.Itoa(int(retryCount)))
		// put to pending
		q.redisCli.ZAdd(q.pendingKey, map[string]float64{idStr: float64(t.Unix())})
		return nil
	})
	if err != nil {
		return fmt.Errorf("send schedule msg failed: %v", err)
	}

	q.reportEvent(NewMessageEvent, 1)
	return nil
}

// SendDelayMsg submits a message delivered after given duration
func (q *DelayQueue) SendDelayMsg(payload string, duration time.Duration, opts ...interface{}) error {
	now := time.Now()
	t := now.Add(duration)
	return q.SendScheduleMsg(payload, t, opts...)
}

// SendScheduleMsgBatch batch submits a message delivered at given time
func (q *DelayQueue) SendScheduleMsgBatch(payloads []string, t time.Time, batchSize int, opts ...interface{}) error {
	// parse options
	retryCount := q.defaultRetryCount
	for _, opt := range opts {
		switch o := opt.(type) {
		case retryCountOpt:
			retryCount = uint(o)
		case msgTTLOpt:
			q.msgTTL = time.Duration(o)
		}
	}

	msgCount := len(payloads)
	payloadm := make(map[string]string, msgCount)
	uids := make([]string, 0, msgCount)
	for i := range payloads {
		// generate id
		uid := unique.GetObjectIdHex()
		payloadm[uid] = payloads[i]
		uids = append(uids, uid)
	}

	chunkIds := ChunkSlice(uids, batchSize)
	for batch, ids := range chunkIds {
		err := q.redisCli.Pipelined(func(cmd Cmder) error {
			// store msg
			now := time.Now()
			msgTTL := t.Sub(now) + q.msgTTL // delivery + q.msgTTL
			ForEach(ids, func(index int, id string) {
				q.redisCli.Set(q.genMsgKey(id), payloadm[id], msgTTL)
			})
			// store retry count
			retryCountKeyArg := make([]interface{}, 0, 2*len(ids))
			rc := strconv.Itoa(int(retryCount))
			ForEach(ids, func(index int, id string) { retryCountKeyArg = append(retryCountKeyArg, id, rc) })
			q.redisCli.HSet(q.retryCountKey, retryCountKeyArg...)
			// put to pending
			pendingKeyArg := make(map[string]float64, len(ids))
			score := float64(t.Unix())
			increment := math.SmallestNonzeroFloat64                                                                          // score increment
			ForEach(ids, func(index int, id string) { pendingKeyArg[id] = score + float64(batch*batchSize+index)*increment }) // keep order
			q.redisCli.ZAdd(q.pendingKey, pendingKeyArg)
			return nil
		})
		if err != nil {
			return fmt.Errorf("batch send schedule msg failed: %v", err)
		}
	}

	q.reportEvent(NewMessageEvent, msgCount)

	return nil
}

// SendDelayMsgBatch batch submits a message delivered after given duration
func (q *DelayQueue) SendDelayMsgBatch(payloads []string, duration time.Duration, batchSize int, opts ...interface{}) error {
	now := time.Now()
	t := now.Add(duration)
	return q.SendScheduleMsgBatch(payloads, t, batchSize, opts...)
}

// testCmdScript test lua script cmd: zadd
// keys: testKey
// argv:
// returns: args size:6
const testCmdScript = `
local unpackx = (function() if table.unpack then return table.unpack else return unpack end end)()
local args = {}
for i=1, 3 do
	table.insert(args, ARGV[1]) 
	table.insert(args, tostring(i))
end

redis.call('ZADD', KEYS[1], unpackx(args))
redis.call('DEL', KEYS[1])
return #args
`

func (q *DelayQueue) testCmd() error {
	now := time.Now().Unix()
	keys := []string{q.testKey}
	raw, err := q.redisCli.Eval(testCmdScript, keys, []interface{}{now})
	if err != nil {
		return fmt.Errorf("testCmdScript failed: %v", err)
	}
	count, ok := raw.(int64)
	if !ok || count != 6 {
		return fmt.Errorf("illegal result: %#v", raw)
	}

	q.allowZAddMultipleParams = true

	return nil
}

// pending2ReadyScript atomically moves messages from pending to ready
// keys: pendingKey, readyKey
// argv: currentTime
// returns: ready message number
const pending2ReadyScript = `
local unpackx = (function() if table.unpack then return table.unpack else return unpack end end)()
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1])  -- get ready msg
if (#msgs == 0) then return end
local args2 = {} -- keys to push into ready
for _,v in ipairs(msgs) do
	table.insert(args2, v) 
    if (#args2 == 4000) then
		redis.call('LPush', KEYS[2], unpackx(args2))
		args2 = {}
	end
end
if (#args2 > 0) then 
	redis.call('LPush', KEYS[2], unpackx(args2))
end
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1])  -- remove msgs from pending
return #msgs
`

func (q *DelayQueue) pending2Ready() error {
	now := time.Now().Unix()
	keys := []string{q.pendingKey, q.readyKey}
	raw, err := q.redisCli.Eval(pending2ReadyScript, keys, []interface{}{now})
	if err != nil && err != NilErr {
		return fmt.Errorf("pending2ReadyScript failed: %v", err)
	}
	count, ok := raw.(int64)
	if ok {
		q.reportEvent(ReadyEvent, int(count))
	}
	return nil
}

// ready2UnackScript1 atomically moves messages from ready to unack
// keys: readyKey/retryKey, unackKey
// argv: retryTime, fetchLimit
const ready2UnackScript1 = `
local fetchLimit = tonumber(ARGV[2])
local msgs = redis.call('LRANGE', KEYS[1], 0, fetchLimit - 1) -- get the element in the list that has an index from 0 to fetchLimit-1
if #msgs == 0 then return end
redis.call('LTRIM', KEYS[1], fetchLimit, -1) -- keep fetchLimit to the last element in the list
for i, v in ipairs(msgs) do
	redis.call('ZADD', KEYS[2], ARGV[1], v) -- move msgs to unack
end
return msgs
`

// ready2UnackScript2 atomically moves messages from ready to unack
// keys: readyKey/retryKey, unackKey
// argv: retryTime, fetchLimit
const ready2UnackScript2 = `
local unpackx = (function() if table.unpack then return table.unpack else return unpack end end)()
local fetchLimit = tonumber(ARGV[2])
local msgs = redis.call('LRANGE', KEYS[1], 0, fetchLimit - 1) -- get the element in the list that has an index from 0 to fetchLimit-1
if #msgs == 0 then return end
redis.call('LTRIM', KEYS[1], fetchLimit, -1) -- keep fetchLimit to the last element in the list
local args = {}
local batchSize = 500
for i, v in ipairs(msgs) do
    table.insert(args, ARGV[1]) 
	table.insert(args, v)
    if i % batchSize == 0 or i == #msgs then
		redis.call('ZADD', KEYS[2], unpackx(args)) -- move msgs to unack
        args = {} 
    end
end
return msgs
`

func (q *DelayQueue) ready2Unack() ([]string, error) {
	script := ready2UnackScript1
	if q.allowZAddMultipleParams {
		script = ready2UnackScript2
	}

	retryTime := time.Now().Add(q.maxConsumeDuration).Unix()
	keys := []string{q.readyKey, q.unAckKey}
	ret, err := q.redisCli.Eval(script, keys, []interface{}{retryTime, q.fetchLimit})
	if err == NilErr {
		return nil, err
	}
	if err != nil {
		return nil, fmt.Errorf("ready2UnackScript failed: %v", err)
	}

	list, ok := ret.([]interface{})
	if !ok {
		return nil, fmt.Errorf("illegal result: %#v", ret)
	}

	ids := make([]string, 0, len(list))
	for _, v := range list {
		ids = append(ids, v.(string))
	}

	q.reportEvent(DeliveredEvent, len(ids))
	return ids, nil
}

func (q *DelayQueue) retry2Unack() ([]string, error) {
	script := ready2UnackScript1
	if q.allowZAddMultipleParams {
		script = ready2UnackScript2
	}

	retryTime := time.Now().Add(q.maxConsumeDuration).Unix()
	keys := []string{q.retryKey, q.unAckKey}
	ret, err := q.redisCli.Eval(script, keys, []interface{}{retryTime, q.fetchLimit})
	if err == NilErr {
		return nil, err
	}
	if err != nil {
		return nil, fmt.Errorf("retry2UnackScript failed: %v", err)
	}

	list, ok := ret.([]interface{})
	if !ok {
		return nil, fmt.Errorf("illegal result: %#v", ret)
	}

	ids := make([]string, 0, len(list))
	for _, v := range list {
		ids = append(ids, v.(string))
	}

	return ids, nil
}

func (q *DelayQueue) callback(idStr string) error {
	payload, err := q.redisCli.Get(q.genMsgKey(idStr))
	if err == NilErr {
		return nil
	}
	if err != nil {
		// Is an IO error?
		return fmt.Errorf("get message payload failed: %v", err)
	}
	ack := q.cb(payload)
	if ack {
		err = q.ack(idStr)
	} else {
		err = q.nack(idStr)
	}
	return err
}

// batchCallback calls DelayQueue.callback in batch. callback is executed concurrently according to property DelayQueue.concurrent
// batchCallback must wait all callback finished, otherwise the actual number of processing messages may beyond DelayQueue.FetchLimit
func (q *DelayQueue) batchCallback(ids []string) {
	if len(ids) == 1 || q.concurrent == 1 {
		for _, id := range ids {
			err := q.callback(id)
			if err != nil {
				q.logger.Errorf("consume msg %s failed: %v", id, err)
			}
		}
		return
	}
	ch := make(chan string, len(ids))
	for _, id := range ids {
		ch <- id
	}
	close(ch)
	wg := sync.WaitGroup{}
	concurrent := int(q.concurrent)
	if concurrent > len(ids) { // too many goroutines is no use
		concurrent = len(ids)
	}
	wg.Add(concurrent)
	for i := 0; i < concurrent; i++ {
		ants.Submit(func() {
			defer wg.Done()
			for id := range ch {
				err := q.callback(id)
				if err != nil {
					q.logger.Errorf("consume msg %s failed: %v", id, err)
				}
			}
		})
	}
	wg.Wait()
}

func (q *DelayQueue) ack(idStr string) error {
	err := q.redisCli.Pipelined(func(cmd Cmder) error {
		cmd.ZRem(q.unAckKey, []string{idStr})
		// msg key has ttl, ignore result of delete
		cmd.Del([]string{q.genMsgKey(idStr)})
		cmd.HDel(q.retryCountKey, []string{idStr})
		return nil
	})

	if err != nil {
		return fmt.Errorf("ack failed: %v", err)
	}

	q.reportEvent(AckEvent, 1)
	return nil
}

func (q *DelayQueue) nack(idStr string) error {
	// update retry time as now, unack2Retry will move it to retry immediately
	err := q.redisCli.ZAdd(q.unAckKey, map[string]float64{
		idStr: float64(time.Now().Unix()),
	})
	if err != nil {
		return fmt.Errorf("negative ack failed: %v", err)
	}
	q.reportEvent(NackEvent, 1)
	return nil
}

// unack2RetryScript atomically moves messages from unack to retry which remaining retry count greater than 0,
// and moves messages from unack to garbage which  retry count is 0
// Because DelayQueue cannot determine garbage message before eval unack2RetryScript, so it cannot pass keys parameter to redisCli.Eval
// Therefore unack2RetryScript moves garbage message to garbageKey instead of deleting directly
// keys: unackKey, retryCountKey, retryKey, garbageKey
// argv: currentTime
// returns: {retryMsgs, failMsgs}
const unack2RetryScript = `
local unpackx = (function() if table.unpack then return table.unpack else return unpack end end)()
local unack2retry = function(msgs)
	local retryCounts = redis.call('HMGet', KEYS[2], unpackx(msgs)) -- get retry count
	local retryMsgs = 0
	local failMsgs = 0
	for i,v in ipairs(retryCounts) do
		local k = msgs[i]
		if v ~= false and v ~= nil and v ~= '' and tonumber(v) > 0 then
			redis.call("HIncrBy", KEYS[2], k, -1) -- reduce retry count
			redis.call("LPush", KEYS[3], k) -- add to retry
			retryMsgs = retryMsgs + 1
		else
			redis.call("HDel", KEYS[2], k) -- del retry count
			redis.call("SAdd", KEYS[4], k) -- add to garbage
			failMsgs = failMsgs + 1
		end
	end
	return retryMsgs, failMsgs
end

local retryMsgs = 0
local failMsgs = 0
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1])  -- get retry msg
if (#msgs == 0) then return end
if #msgs < 4000 then
	local d1, d2 = unack2retry(msgs)
	retryMsgs = retryMsgs + d1
	failMsgs = failMsgs + d2
else
	local buf = {}
	for _,v in ipairs(msgs) do
		table.insert(buf, v)
		if #buf == 4000 then
		    local d1, d2 = unack2retry(buf)
			retryMsgs = retryMsgs + d1
			failMsgs = failMsgs + d2
			buf = {}
		end
	end
	if (#buf > 0) then
		local d1, d2 = unack2retry(buf)
		retryMsgs = retryMsgs + d1
		failMsgs = failMsgs + d2
	end
end
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1])  -- remove msgs from unack
return {retryMsgs, failMsgs}
`

func (q *DelayQueue) unack2Retry() error {
	keys := []string{q.unAckKey, q.retryCountKey, q.retryKey, q.garbageKey}
	now := time.Now()
	raw, err := q.redisCli.Eval(unack2RetryScript, keys, []interface{}{now.Unix()})
	if err != nil && err != NilErr {
		return fmt.Errorf("unack to retry script failed: %v", err)
	}
	infos, ok := raw.([]interface{})
	if ok && len(infos) == 2 {
		retryCount, ok := infos[0].(int64)
		if ok {
			q.reportEvent(RetryEvent, int(retryCount))
		}
		failCount, ok := infos[1].(int64)
		if ok {
			q.reportEvent(FinalFailedEvent, int(failCount))
		}
	}
	return nil
}

func (q *DelayQueue) garbageCollect() error {
	msgIds, err := q.redisCli.SMembers(q.garbageKey)
	if err != nil {
		return fmt.Errorf("smembers failed: %v", err)
	}
	if len(msgIds) == 0 {
		return nil
	}
	// allow concurrent clean
	msgKeys := make([]string, 0, len(msgIds))
	for _, idStr := range msgIds {
		msgKeys = append(msgKeys, q.genMsgKey(idStr))
	}

	err = q.redisCli.Pipelined(func(cmd Cmder) error {
		// msg key has ttl, ignore result of delete
		cmd.Del(msgKeys)
		cmd.SRem(q.garbageKey, msgIds)
		return nil
	})
	if err != nil {
		return fmt.Errorf("garbage collect failed: %v", err)
	}
	return nil
}

func (q *DelayQueue) consume() error {
	var err error
	// pending to ready
	if q.quit == 0 {
		err = q.pending2Ready()
		if err != nil {
			return err
		}
	}

	// consume
	ids := make([]string, 0, q.fetchLimit)
	if q.quit == 0 {
		ids, err = q.ready2Unack()
		if err != nil && err != NilErr {
			return err
		}
	}

	if len(ids) > 0 {
		q.batchCallback(ids)
	}

	if q.quit == 0 {
		// unack to retry
		err := q.unack2Retry()
		if err != nil {
			return err
		}

		err = q.garbageCollect()
		if err != nil {
			return err
		}
	}

	// retry
	ids = make([]string, 0, q.fetchLimit)
	if q.quit == 0 {
		ids, err = q.retry2Unack()
		if err != nil && err != NilErr {
			return err
		}
	}

	if len(ids) > 0 {
		q.batchCallback(ids)
	}
	return nil
}

func (q *DelayQueue) Consume() error {
	return q.consume()
}

// StartConsume creates a goroutine to consume message from DelayQueue
// use `<-done` to wait consumer stopping
// If there is no callback set, StartConsume will panic
func (q *DelayQueue) StartConsume() (done <-chan struct{}) {
	if q.cb == nil {
		panic("this instance has no callback")
	}
	q.close = make(chan struct{}, 1)
	q.ticker = time.NewTicker(q.fetchInterval)
	done0 := make(chan struct{})

	err := q.testCmd()
	log.Printf("testCmd error: %v", err)

	go func() {
	tickerLoop:
		for {
			select {
			case <-q.ticker.C:
				err := q.consume()
				if err != nil {
					log.Printf("consume error: %v", err)
				}
			case <-q.close:
				break tickerLoop
			}
		}
		close(done0)
	}()
	return done0
}

// StopConsume stops consumer goroutine
func (q *DelayQueue) StopConsume() {
	close(q.close)
	q.quit = 1
	if q.ticker != nil {
		q.ticker.Stop()
	}
}

// GetPendingCount returns the number of pending messages
func (q *DelayQueue) GetPendingCount() (int64, error) {
	return q.redisCli.ZCard(q.pendingKey)
}

// GetReadyCount returns the number of messages which have arrived delivery time but but have not been delivered
func (q *DelayQueue) GetReadyCount() (int64, error) {
	return q.redisCli.LLen(q.readyKey)
}

// GetProcessingCount returns the number of messages which are being processed
func (q *DelayQueue) GetProcessingCount() (int64, error) {
	return q.redisCli.ZCard(q.unAckKey)
}

// EventListener which will be called when events occur
// This Listener can be used to monitor running status
type EventListener interface {
	// OnEvent will be called when events occur
	OnEvent(*Event)
}

// ListenEvent register a listener which will be called when events occur,
// so it can be used to monitor running status
//
// But It can ONLY receive events from the CURRENT INSTANCE,
// if you want to listen to all events in queue, just use Monitor.ListenEvent
//
// There can be AT MOST ONE EventListener in an DelayQueue instance.
// If you are using customized listener, Monitor will stop working
func (q *DelayQueue) ListenEvent(listener EventListener) {
	q.eventListener = listener
}

// RemoveListener stops reporting events to EventListener
func (q *DelayQueue) DisableListener() {
	q.eventListener = nil
}

func (q *DelayQueue) reportEvent(code int, count int) {
	listener := q.eventListener // eventListener may be changed during running
	if listener != nil && count > 0 {
		event := &Event{
			Code:      code,
			Timestamp: time.Now().Unix(),
			MsgCount:  count,
		}
		listener.OnEvent(event)
	}
}

// pubsubListener receives events and reports them through redis pubsub for monitoring
type pubsubListener struct {
	redisCli   RedisCli
	reportChan string
}

func genReportChannel(name string) string {
	return "dq:" + name + ":reportEvents"
}

// EnableReport enables reporting to monitor
func (q *DelayQueue) EnableReport() {
	reportChan := genReportChannel(q.name)
	q.ListenEvent(&pubsubListener{
		redisCli:   q.redisCli,
		reportChan: reportChan,
	})
}

// DisableReport stops reporting to monitor
func (q *DelayQueue) DisableReport() {
	q.DisableListener()
}

func (l *pubsubListener) OnEvent(event *Event) {
	payload := encodeEvent(event)
	l.redisCli.Publish(l.reportChan, payload)
}

func ChunkSlice[S ~[]E, E comparable](slice S, chunkSize int) (dividedSlice []S) {
	for i := 0; i < len(slice); i += chunkSize {
		end := i + chunkSize

		if end > len(slice) {
			end = len(slice)
		}

		dividedSlice = append(dividedSlice, slice[i:end])
	}

	return dividedSlice
}

func ForEach[S ~[]E, E any](list S, do func(index int, value E)) {
	for i, _ := range list {
		do(i, list[i])
	}
}

func DefaultValue[T comparable](val T, def T) T {
	var zero T
	if val == zero {
		return def
	}
	return val
}
